 1.Flink状态化流处理框架之Flink快速应用中单词统计案例(批数据)
   
   通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)
   1).需求
   统计一个文件中各个单词出现的次数，把统计结果输出到文件
   步骤：
   1、 读取数据源
   2、 处理数据源
   a、 将读到的数据源文件中的每一行根据空格切分
   b、 将切分好的每个单词拼接1
   c、 根据单词聚合（将相同的单词放在一起）
   d、 累加相同的单词（单词后面的1进行累加）
   3、 保存处理结果
   2).代码实现
   引入依赖
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.11.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.11.1</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <properties>
        <project.build.sourceEncoding>UTF- 8</project.build.sourceEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>   
   Java程序
package com.lagou;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 *  1、 读取数据源
 *  2、 处理数据源
 *  a、 将读到的数据源文件中的每一行根据空格切分
 *  b、 将切分好的每个单词拼接1
 *  c、 根据单词聚合（将相同的单词放在一起）
 *  d、 累加相同的单词（单词后面的1进行累加）
 *  3、 保存处理结果
 */
public class WordCountJavaBatch {
    public static void main(String[] args) throws Exception {
        String inputPath ="D:\\data\\input\\hello.txt";
        String outputPath="D:\\data\\ouput";

        //获取flink的运行环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = executionEnvironment.readTextFile(inputPath);
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = text.flatMap(new SplitClz());

        //(hello 1) (you 1) (hi 1) (him 1)
        UnsortedGrouping<Tuple2<String, Integer>> groupedWordAndOne = wordAndOne.groupBy(0);
        //(hello 1) (hello 1)
        AggregateOperator<Tuple2<String, Integer>> out = groupedWordAndOne.sum(1);

        out.writeAsCsv(outputPath, "\n", " ").setParallelism(1);

        executionEnvironment.execute();
    }

    static class SplitClz implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] s1 = s.split(" ");
            for (String word : s1) {
                collector.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

   
   scala程序
import org.apache.flink.api.scala._

object WordCountScalaBatch {
  def main(args: Array[String]): Unit = {
    val inputPath = "D:\\data\\input\\hello.txt"
    val outputPath = "D:\\data\\ouput"
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val text: DataSet[String] = environment.readTextFile(inputPath)
    val out: AggregateDataSet[(String, Int)] = text.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
    out.writeAsCsv(outputPath, "\n", " ").setParallelism(1)
    environment.execute("scala batch process")
  }
}
   
 2.单词统计案例(流数据)
   
   1).需求
   Socket模拟实时发送单词，使用Flink实时接收数据，对指定时间窗口内（如5s）的数据进行聚合
统计，每隔1s汇总计算一次，并且把时间窗口内计算结果打印出来。
   2).代码实现
   scala程序
import org.apache.flink.streaming.api.scala._

object WordCountScalaStream {
  def main(args: Array[String]): Unit = {
    //处理流式数据
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val streamData: DataStream[String] = environment.socketTextStream("linux121", 7777)
    val out = streamData.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    out.print()
    environment.execute()
  }
}
   
   java程序
package com.lagou;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountJavaStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = environment.socketTextStream("linux121", 7777);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        }).keyBy(0).sum(1);
        sum.print();
        environment.execute();
    }
}
   
   Flink程序开发的流程总结如下：
   (1).获得一个执行环境
   (2).加载/创建初始化数据
   (3).指定数据操作的算子
   (4).指定结果数据存放位置
   (5).调用execute()触发执行程序
   注意：Flink程序是延迟计算的，只有最后调用execute()方法的时候才会真正触发执行程序